/*
 * Copyright 2021-2022 Open Kunlun Technology <https://www.openkunlun.io>
 */

package io.openkunlun.scaladsl.client

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.event.Logging
import com.google.protobuf.ByteString
import io.openkunlun.scaladsl.serialization.{ DaprObjectSerialization, JsonSerialization }
import io.openkunlun.scaladsl.util.Strings
import io.openkunlun.scaladsl.v1.{ InvokeRequest, InvokeResponse, InvokeService }

import scala.concurrent.{ ExecutionContext, Future }

/**
 * @author ericxin.
 */
private object DaprInvocationClient extends ExtensionId[DaprInvocationClient] with ExtensionIdProvider {
  override def lookup: ExtensionId[DaprInvocationClient] = DaprInvocationClient
  override def createExtension(system: ExtendedActorSystem) = new DaprInvocationClient(system)
}
private class DaprInvocationClient(system: ExtendedActorSystem) extends Extension {

  private val log = Logging(system, getClass)

  private val serialization: DaprObjectSerialization = JsonSerialization(system)
  private val daprClient = DaprGrpcClient(system)

  def invokeMethod[T: Manifest](action: InvokeMethodAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[T] = {
    for {
      result <- if (headers.nonEmpty) {
        var builder = daprClient.client.invokeService()
        headers.foreach(it => builder = builder.addHeader(it._1, it._2))
        for {
          response <- builder.invoke(buildRequest(action))
          result = deserializeInvokeResponse[T](response)
        } yield result
      } else {
        for {
          response <- daprClient.client.invokeService(buildRequest(action))
          result = deserializeInvokeResponse[T](response)
        } yield result
      }
      _ <- if (result.isDefined) Future.unit else Future.failed(new DaprInvocationResultIsEmptyException(action.method))
    } yield result.get
  }

  private def buildRequest(action: InvokeMethodAction)(implicit serialization: DaprObjectSerialization) = {
    if (Strings.isEmpty(action.id)) {
      throw new InvocationIdIsEmptyException()
    }
    if (Strings.isEmpty(action.method)) {
      throw new InvocationMethodIsEmptyException()
    }
    InvokeService(action.id, Some(buildInvokeRequest(action.method, action.data, action.contentType)))
  }

  private def deserializeInvokeResponse[T: Manifest](response: InvokeResponse)(implicit serialization: DaprObjectSerialization): Option[T] = {
    response.data.map(_.value.toByteArray).map(it => serialization.deserialize[T](it))
  }

  private def buildInvokeRequest(method: String, data: AnyRef, contentType: Option[String], httpMethod: Option[String] = None, querystring: Option[String] = None)(implicit serialization: DaprObjectSerialization): InvokeRequest = {
    InvokeRequest(method, serializeData(data), contentType.getOrElse(""))
  }

  private def serializeData(data: AnyRef)(implicit serialization: DaprObjectSerialization): Option[com.google.protobuf.any.Any] = {
    if (data != null) {
      val bytes = serialization.serialize(data)
      Some(com.google.protobuf.any.Any(value = ByteString.copyFrom(bytes)))
    } else Option.empty[com.google.protobuf.any.Any]
  }
}
